[Router] Add Valkey memory backend with TLS support#1739
[Router] Add Valkey memory backend with TLS support#1739rootfs merged 13 commits intovllm-project:mainfrom
Conversation
✅ Deploy Preview for vllm-semantic-router ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
✅ Supply Chain Security Report — All Clear
Scanned at |
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
Extract TLS configuration into buildValkeyTLSConfig helper to satisfy cyclop (complexity 13 > 12) and nestif linters. Signed-off-by: Daria Korenieva <daric2612@gmail.com>
57a58b7 to
aeb632c
Compare
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
👥 vLLM Semantic Team NotificationThe following members have been identified for the changed files in this PR and have been automatically assigned: 📁
|
|
@Xunzhuo This PR is ready for review. Would appreciate any feedback when you get a chance. Thanks! |
There was a problem hiding this comment.
Pull request overview
Adds a Valkey (Search module) implementation of the router “memory” Store, selectable via global.stores.memory.backend: valkey, including TLS wiring, plus docs/config examples and tests to validate the new backend.
Changes:
- Introduce
ValkeyStoreimplementing the memoryStoreinterface (vector retrieval viaFT.SEARCH, HNSW index init, hybrid reranking, access tracking). - Add config surfaces for
memory.backend+memory.valkey.*(Go runtime config + CLI model), and wire backend selection + TLS setup in the extproc router. - Add documentation, example configs, and unit/integration tests for the Valkey backend.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| website/sidebars.ts | Adds a docs sidebar category entry for the Valkey memory deployment guide. |
| website/docs/tutorials/plugin/memory.md | Documents that the memory plugin requires a configured backing store and mentions Valkey backend. |
| website/docs/tutorials/global/stores-and-tools.md | Adds global configuration examples for Milvus vs Valkey memory backends. |
| website/docs/installation/valkey-memory.md | New deployment/config/tuning/troubleshooting guide for Valkey as memory backend. |
| src/vllm-sr/cli/models_memory.py | Adds CLI-side Pydantic model for Valkey memory config + backend selector field. |
| src/semantic-router/pkg/memory/valkey_store.go | Implements Valkey-backed memory Store (index init, CRUD, retrieve, list, delete-by-scope, etc.). |
| src/semantic-router/pkg/memory/valkey_store_helpers.go | Helper logic (result parsing, retry/backoff, escaping, access tracking, hybrid rerank). |
| src/semantic-router/pkg/memory/valkey_store_test.go | Unit tests for Valkey helper behavior + TLS config fields. |
| src/semantic-router/pkg/memory/valkey_store_integration_test.go | Integration tests against a live Valkey + Search module instance. |
| src/semantic-router/pkg/memory/valkey_store_integration_validation_test.go | Integration validation tests (bad inputs, disabled-store behavior, TLS config propagation). |
| src/semantic-router/pkg/memory/consolidation.go | Refactors ConsolidateUser to be backend-agnostic via the Store interface. |
| src/semantic-router/pkg/extproc/router_memory.go | Adds backend selection (milvus/valkey), Valkey client + TLS construction, and shared Redis cache wrapping. |
| src/semantic-router/pkg/config/runtime_config.go | Adds MemoryConfig.Backend and MemoryValkeyConfig to runtime YAML config surface. |
| src/semantic-router/pkg/config/reference_config_global_test.go | Extends reference-config coverage assertions to include global.stores.memory.valkey. |
| e2e/config/config.memory-user-valkey.yaml | Adds an E2E config profile using the Valkey memory backend. |
| deploy/examples/runtime/memory/valkey.yaml | Adds a Valkey memory backend example configuration (including TLS knobs). |
| deploy/examples/runtime/README.md | Updates runtime examples README to mention memory/vector-store config references. |
| config/config.yaml | Extends the repo’s canonical example config to include memory.backend and a memory.valkey block. |
| embeddingConfig := &memory.EmbeddingConfig{ | ||
| Model: memory.EmbeddingModelType(detectMemoryEmbeddingModel(cfg)), | ||
| Dimension: vc.Dimension, | ||
| } |
There was a problem hiding this comment.
Fixed. createValkeyMemoryStore now normalizes vc.Dimension before constructing the EmbeddingConfig or calling NewValkeyStore. If vc.Dimension <= 0, the dimension is derived from the resolved embedding model: 256 for mmbert, 384 for all others. This ensures the FT index dimension and the embedding dimension always agree, regardless of whether the user sets valkey.dimension explicitly.
| key := v.hashKey(memory.ID) | ||
|
|
||
| err = v.retryWithBackoff(ctx, func() error { | ||
| _, hsetErr := v.client.HSet(ctx, key, fields) | ||
| return hsetErr | ||
| }) | ||
| if err != nil { | ||
| status = "error" | ||
| return fmt.Errorf("valkey HSET failed for memory id=%s: %w", memory.ID, err) | ||
| } |
There was a problem hiding this comment.
Fixed. ValkeyStore.Store now does an EXISTS check before HSET. If the key already exists it returns "memory already exists: <id>", matching the Store interface contract and the behaviour of InMemoryStore/MilvusStore. TestValkeyStoreInteg_DuplicateKeys has been updated to expect an error on the second Store call and verify the original content is unchanged.
| // Fetch limit+1 pages worth of data so we can sort client-side and still | ||
| // respect the limit. We over-fetch by a factor to allow client-side sorting | ||
| // by created_at (FT.SEARCH does not support ORDER BY on NUMERIC fields | ||
| // without SORTABLE in all Valkey Search versions). | ||
| // The total count comes from the FT.SEARCH header element. | ||
| fetchLimit := limit * 5 | ||
| if fetchLimit < 100 { | ||
| fetchLimit = 100 | ||
| } | ||
| if fetchLimit > 10000 { | ||
| fetchLimit = 10000 | ||
| } | ||
|
|
||
| searchCmd := []string{ | ||
| "FT.SEARCH", v.indexName, filterExpr, | ||
| "RETURN", "7", "id", "content", "user_id", "memory_type", "metadata", "created_at", "updated_at", | ||
| "LIMIT", "0", strconv.Itoa(fetchLimit), |
There was a problem hiding this comment.
Fixed. List now uses SORTBY created_at DESC with LIMIT 0 <limit> directly in the FT.SEARCH command, since created_at is already declared SORTABLE in the index schema. The over-fetch multiplier (fetchLimit = limit * 5) and the client-side sort.Slice have been removed.
| // Sync metadata JSON with the authoritative HASH fields. | ||
| // We read the current metadata, overwrite access_count and last_accessed | ||
| // with the values we just wrote atomically above, and write it back. | ||
| // This avoids the previous read-modify-write race: even if two goroutines | ||
| // run concurrently, each writes the post-increment count it received from | ||
| // HINCRBY, so the JSON converges to the correct value. | ||
| fields, err := v.client.HGetAll(ctx, key) | ||
| if err != nil { | ||
| return nil // Non-critical: top-level HASH fields are already updated | ||
| } | ||
| if metadataStr, ok := fields["metadata"]; ok && metadataStr != "" { | ||
| var metadata map[string]interface{} | ||
| if jsonErr := json.Unmarshal([]byte(metadataStr), &metadata); jsonErr == nil { | ||
| metadata["last_accessed"] = now.Unix() | ||
| // Use the authoritative count returned by HINCRBY instead of | ||
| // incrementing the stale JSON value. | ||
| metadata["access_count"] = valkeyToInt64(newCount) | ||
| if updated, mErr := json.Marshal(metadata); mErr == nil { | ||
| _, _ = v.client.HSet(ctx, key, map[string]string{"metadata": string(updated)}) | ||
| } |
There was a problem hiding this comment.
Fixed by removing access_count from the metadata JSON entirely. The change is end-to-end: valkeyBuildHashFields no longer writes access_count into the metadata blob; valkeyApplyMetadata no longer reads it from there; valkeyFieldsToMemory reads access_count from the authoritative top-level HASH field (set atomically by HINCRBY); recordRetrieval only updates last_accessed in metadata and explicitly deletes any stale access_count key. This eliminates the race entirely without needing a Lua script.
| result := store | ||
| if rc := cfg.Memory.RedisCache; rc != nil && rc.Enabled && rc.Address != "" { | ||
| cacheCfg := &memory.RedisCacheConfig{ | ||
| Address: rc.Address, | ||
| Password: rc.Password, | ||
| DB: rc.DB, | ||
| KeyPrefix: rc.KeyPrefix, | ||
| TTLSeconds: rc.TTLSeconds, | ||
| } | ||
| redisCache, err := memory.NewRedisCache(ctx, cacheCfg) | ||
| if err != nil { | ||
| logging.Warnf("Memory: Redis cache disabled (connection failed: %v)", err) | ||
| } else { | ||
| result = memory.NewCachingStore(store, redisCache) | ||
| } | ||
| } |
There was a problem hiding this comment.
Fixed. NewCachingStore now accepts a backendLabel string parameter. createMemoryStore passes the resolved backend name ("milvus" or "valkey") so cache hit/miss metrics are correctly labeled per backend. The CachingStore.Retrieve method uses this label when calling RecordMemoryCacheHit/RecordMemoryCacheMiss.
| func TestValkeyStoreInteg_DuplicateKeys(t *testing.T) { | ||
| store, _ := setupValkeyMemoryIntegration(t) | ||
| ctx := context.Background() | ||
|
|
||
| id := fmt.Sprintf("mem_dup_%d", time.Now().UnixNano()) | ||
|
|
||
| // Store first version | ||
| require.NoError(t, store.Store(ctx, &Memory{ | ||
| ID: id, Type: MemoryTypeSemantic, | ||
| Content: "First version", UserID: "dup_user", | ||
| })) | ||
| time.Sleep(200 * time.Millisecond) | ||
|
|
||
| // Store again with same ID (HSET overwrites) | ||
| require.NoError(t, store.Store(ctx, &Memory{ | ||
| ID: id, Type: MemoryTypeSemantic, | ||
| Content: "Second version", UserID: "dup_user", | ||
| })) | ||
| time.Sleep(200 * time.Millisecond) | ||
|
|
||
| // Verify latest content | ||
| retrieved, err := store.Get(ctx, id) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, "Second version", retrieved.Content) | ||
| } |
There was a problem hiding this comment.
Updated. TestValkeyStoreInteg_DuplicateKeys now expects an error on the second Store call (assert.Contains(t, err.Error(), "memory already exists")) and verifies the original content is unchanged via Get. The test comment has been updated to reflect the enforced-uniqueness semantics.
Signed-off-by: Daria Korenieva <daric2612@gmail.com>
…mantic-router into feature/valkey-memory-backend
Signed-off-by: Daria Korenieva <daric2612@gmail.com>

Summary
backend: valkeyin configStoreinterface: Store, Retrieve (vector similarity via FT.SEARCH), Get, Update, List, Forget, ForgetByScope with HNSW indexing, hybrid reranking, adaptive threshold, and access trackingConsolidateUserfrom*MilvusStorereceiver to standalone function acceptingStoreinterface for backend-agnostic consolidationtls_enabled,tls_ca_path,tls_insecure_skip_verify) using valkey-glide native TLS APIrecordRetrievalto keep metadata JSON access_count consistent under concurrent retrievalsTest plan
make test-semantic-router— all packages pass (0 failures)make go-lint— 0 issuesmake check-go-mod-tidy— cleanconfig.memory-user-valkey.yamlprofile against Valkey + Search module